-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathintermediate.go
449 lines (337 loc) · 10.1 KB
/
intermediate.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
package gostreams
import (
"context"
"errors"
"slices"
"sync"
)
// Function returns the result of applying an operation to elem.
type Function[T any, U any] func(elem T) U
// MapperFunc maps element elem to type U.
// The index is the 0-based index of elem, in the order produced by the upstream producer.
type MapperFunc[T any, U any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) U
// PredicateFunc returns true if elem matches a predicate.
// The index is the 0-based index of elem, in the order produced by the upstream producer.
type PredicateFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) bool
// CompareFunc returns a negative number if a < b, a positive number if a > b, and zero if a == b.
type CompareFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, a T, b T) int
// SeenFunc is a function that returns true if elem has been seen before.
type SeenFunc[T any] func(elem T) bool
// ErrLimitReached is the error used to short-circuit a stream by canceling its context to indicate that
// the maximum number of elements given to Limit has been reached.
var ErrLimitReached = errors.New("limit reached")
// FuncMapper returns a mapper that calls mapp for each element.
func FuncMapper[T any, U any](mapp Function[T, U]) MapperFunc[T, U] {
return func(_ context.Context, _ context.CancelCauseFunc, elem T, _ uint64) U {
return mapp(elem)
}
}
// FuncPredicate returns a predicate that calls pred for each element.
func FuncPredicate[T any](pred Function[T, bool]) PredicateFunc[T] {
return func(_ context.Context, _ context.CancelCauseFunc, elem T, _ uint64) bool {
return pred(elem)
}
}
// Map returns a producer that calls mapp for each element produced by prod, mapping it to type U.
func Map[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan U {
outCh := make(chan U)
go func() {
defer close(outCh)
index := uint64(0)
for elem := range prod(ctx, cancel) {
outElem := mapp(ctx, cancel, elem, index)
if contextDone(ctx) {
return
}
select {
case outCh <- outElem:
index++
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// MapConcurrent returns a producer that concurrently calls mapp for each element produced by prod,
// mapping it to type U. It produces elements in undefined order.
func MapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, U]) ProducerFunc[U] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan U {
outCh := make(chan U)
go func() {
defer close(outCh)
index := uint64(0)
grp := sync.WaitGroup{}
for elem := range prod(ctx, cancel) {
grp.Add(1)
go func(elem T, index uint64) {
defer grp.Done()
outElem := mapp(ctx, cancel, elem, index)
if contextDone(ctx) {
return
}
select {
case outCh <- outElem:
case <-ctx.Done():
}
}(elem, index)
index++
}
grp.Wait()
}()
return outCh
}
}
// FlatMap returns a producer that calls mapp for each element produced by prod, mapping it to an intermediate producer
// that produces elements of type U. The new producer produces all elements produced by the intermediate producers, in order.
func FlatMap[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan U {
outCh := make(chan U)
go func() {
defer close(outCh)
prods := []ProducerFunc[U]{}
index := uint64(0)
for elem := range prod(ctx, cancel) {
prods = append(prods, mapp(ctx, cancel, elem, index))
index++
}
prod := Join(prods...)
for elem := range prod(ctx, cancel) {
select {
case outCh <- elem:
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// FlatMapConcurrent returns a producer that calls mapp for each element produced by prod, mapping it to an intermediate producer
// that produces elements of type U. The new producer produces all elements produced by the intermediate producers, in undefined order.
func FlatMapConcurrent[T any, U any](prod ProducerFunc[T], mapp MapperFunc[T, ProducerFunc[U]]) ProducerFunc[U] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan U {
outCh := make(chan U)
go func() {
defer close(outCh)
prods := []ProducerFunc[U]{}
index := uint64(0)
for elem := range prod(ctx, cancel) {
prods = append(prods, mapp(ctx, cancel, elem, index))
index++
}
prod := JoinConcurrent(prods...)
for elem := range prod(ctx, cancel) {
select {
case outCh <- elem:
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// Filter returns a producer that calls filter for each element produced by prod, and only produces elements for which
// filter returns true.
func Filter[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
index := uint64(0)
for elem := range prod(ctx, cancel) {
filterResult := filter(ctx, cancel, elem, index)
if contextDone(ctx) {
return
}
index++
if !filterResult {
continue
}
select {
case outCh <- elem:
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// FilterConcurrent returns a producer that calls filter for each element produced by prod, and only produces elements for which
// filter returns true. It produces elements in undefined order.
func FilterConcurrent[T any](prod ProducerFunc[T], filter PredicateFunc[T]) ProducerFunc[T] { //nolint:gocognit // goroutine handling is a bit more involved
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
index := uint64(0)
grp := sync.WaitGroup{}
for elem := range prod(ctx, cancel) {
grp.Add(1)
go func(elem T, index uint64) {
defer grp.Done()
filterResult := filter(ctx, cancel, elem, index)
if contextDone(ctx) {
return
}
if !filterResult {
return
}
select {
case outCh <- elem:
case <-ctx.Done():
}
}(elem, index)
index++
}
grp.Wait()
}()
return outCh
}
}
// Peek returns a producer that calls peek for each element produced by prod, in order, and produces the same elements.
func Peek[T any](prod ProducerFunc[T], peek ConsumerFunc[T]) ProducerFunc[T] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
index := uint64(0)
for elem := range prod(ctx, cancel) {
peek(ctx, cancel, elem, index)
if contextDone(ctx) {
return
}
select {
case outCh <- elem:
index++
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// Limit returns a producer that produces the same elements as prod, in order, up to max elements.
// Once the limit has been reached, it cancels prod's context with ErrLimitReached (but not the
// entire stream's context).
func Limit[T any](prod ProducerFunc[T], max uint64) ProducerFunc[T] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
prodCtx, cancelProd := context.WithCancelCause(ctx)
defer cancelProd(nil)
ch := prod(prodCtx, cancel)
if max == 0 {
cancelProd(ErrLimitReached)
return
}
done := uint64(0)
for elem := range ch {
select {
case outCh <- elem:
done++
if done == max {
cancelProd(ErrLimitReached)
return
}
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// Skip returns a producer that produces the same elements as prod, in order, skipping the first num elements.
func Skip[T any](prod ProducerFunc[T], num uint64) ProducerFunc[T] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
done := uint64(0)
for elem := range prod(ctx, cancel) {
done++
if done <= num {
continue
}
select {
case outCh <- elem:
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// Sort returns a producer that consumes elements from prod, sorts them using cmp, and produces them in sorted order.
func Sort[T any](prod ProducerFunc[T], cmp CompareFunc[T]) ProducerFunc[T] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
result := []T{}
for elem := range prod(ctx, cancel) {
result = append(result, elem)
}
slices.SortFunc(result, func(a T, b T) int {
return cmp(ctx, cancel, a, b)
})
for _, elem := range result {
select {
case outCh <- elem:
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// Distinct returns a producer that produces those elements produced by prod which are distinct.
func Distinct[T comparable](prod ProducerFunc[T]) ProducerFunc[T] {
return DistinctSeen(prod, seenMap[T]())
}
// DistinctSeen returns a producer that produces those elements produced by prod for which seen returns false.
func DistinctSeen[T any](prod ProducerFunc[T], seen SeenFunc[T]) ProducerFunc[T] {
return func(ctx context.Context, cancel context.CancelCauseFunc) <-chan T {
outCh := make(chan T)
go func() {
defer close(outCh)
for elem := range prod(ctx, cancel) {
if seen(elem) {
continue
}
select {
case outCh <- elem:
case <-ctx.Done():
return
}
}
}()
return outCh
}
}
// seenMap returns a SeenFunc that uses a map to keep track of seen elements.
func seenMap[T comparable]() SeenFunc[T] {
seen := map[T]struct{}{}
return func(elem T) bool {
if _, ok := seen[elem]; ok {
return true
}
seen[elem] = struct{}{}
return false
}
}
// Identity returns a mapper that returns the same element it receives.
func Identity[T any]() MapperFunc[T, T] {
return FuncMapper(func(elem T) T {
return elem
})
}