-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathterminal.go
173 lines (129 loc) · 5.47 KB
/
terminal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package gostreams
import (
"context"
"errors"
"sync"
)
// ConsumerFunc consumes element elem.
// The index is the 0-based index of elem, in the order produced by the upstream producer.
type ConsumerFunc[T any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64)
// CollectorFunc folds element elem into an internal accumulator and returns the result so far.
// The index is the 0-based index of elem, in the order produced by the upstream producer.
type CollectorFunc[T any, R any] func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) R
// ErrShortCircuit is a generic error used to short-circuit a stream by canceling its context.
var ErrShortCircuit = errors.New("short circuit")
// Reduce calls coll for each element produced by prod, and returns the final result.
// If the stream's context is canceled, it returns the result so far, and the cause of the cancellation.
// If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
func Reduce[T any, R any](ctx context.Context, prod ProducerFunc[T], coll CollectorFunc[T, R]) (R, error) {
var result R
err := Each(ctx, prod, func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) {
result = coll(ctx, cancel, elem, index)
})
return result, err
}
// ReduceSlice returns a slice of all elements produced by prod.
// If the stream's context is canceled, it returns the collected elements so far, and the cause of the cancellation.
// If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
func ReduceSlice[T any](ctx context.Context, prod ProducerFunc[T]) ([]T, error) {
return Reduce(ctx, prod, CollectSlice[T]())
}
// Each calls each for each element produced by prod.
// If the stream's context is canceled, it returns the cause of the cancellation.
// If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
func Each[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
ch := prod(ctx, cancel)
index := uint64(0)
for elem := range ch {
each(ctx, cancel, elem, index)
if contextDone(ctx) {
break
}
index++
}
err := context.Cause(ctx)
if errors.Is(err, ErrShortCircuit) {
err = nil
}
return err
}
// EachConcurrent concurrently calls each for each element produced by prod.
// If the stream's context is canceled, it returns the cause of the cancellation.
// If the cause of cancellation was ErrShortCircuit, it returns a nil error instead.
func EachConcurrent[T any](ctx context.Context, prod ProducerFunc[T], each ConsumerFunc[T]) error {
ctx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
ch := prod(ctx, cancel)
index := uint64(0)
grp := sync.WaitGroup{}
for elem := range ch {
grp.Add(1)
go func(elem T, index uint64) {
defer grp.Done()
each(ctx, cancel, elem, index)
}(elem, index)
index++
}
grp.Wait()
err := context.Cause(ctx)
if errors.Is(err, ErrShortCircuit) {
err = nil
}
return err
}
// AnyMatch returns true as soon as pred returns true for an element produced by prod, that is, an element matches.
// If an element matches, it cancels the stream's context using ErrShortCircuit, and returns a nil error.
// If the stream's context is canceled with any other error, it returns an undefined result, and the cause of the cancellation.
func AnyMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error) {
anyMatch := false
err := Each(ctx, prod, func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) {
if !pred(ctx, cancel, elem, index) {
return
}
anyMatch = true
cancel(ErrShortCircuit)
})
return anyMatch, err
}
// AllMatch returns true if pred returns true for all elements produced by prod, that is, all elements match.
// If any element doesn't match, it cancels the stream's context using ErrShortCircuit, and returns a nil error.
// If the stream's context is canceled with any other error, it returns an undefined result, and the cause of the cancellation.
func AllMatch[T any](ctx context.Context, prod ProducerFunc[T], pred PredicateFunc[T]) (bool, error) {
allMatch := true
err := Each(ctx, prod, func(ctx context.Context, cancel context.CancelCauseFunc, elem T, index uint64) {
if pred(ctx, cancel, elem, index) {
return
}
allMatch = false
cancel(ErrShortCircuit)
})
return allMatch, err
}
// Count returns the number of elements produced by prod.
// If the stream's context is canceled, it returns an undefined result, and the cause of the cancellation.
func Count[T any](ctx context.Context, prod ProducerFunc[T]) (uint64, error) {
count := uint64(0)
err := Each(ctx, prod, func(_ context.Context, _ context.CancelCauseFunc, _ T, _ uint64) {
count++
})
return count, err
}
// First returns the first element produced by prod.
// If prod produces an element, it cancels the stream's context using ErrShortCircuit, and returns
// the element, a true bool result, and a nil error.
// If prod does not produce elements, it returns a false bool result, and a nil error.
// If the stream's context is canceled, it returns an undefined result, and the cause of the cancellation.
func First[T any](ctx context.Context, prod ProducerFunc[T]) (T, bool, error) {
var (
first T
ok bool
)
err := Each(ctx, prod, func(_ context.Context, cancel context.CancelCauseFunc, elem T, _ uint64) {
first = elem
ok = true
cancel(ErrShortCircuit)
})
return first, ok, err
}