-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.go
207 lines (173 loc) · 6.03 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
package streams
import (
"bufio"
)
// Predicate is used to Filter elements from streams
type Predicate func(element interface{}) bool
// Reducer is used to Reduce elements from streams into a single element
type Reducer func(first, second interface{}) interface{}
// Mapper is used to Map elements from streams from one thing to another
type Mapper func(element interface{}) interface{}
// FlatMapper is used to Map elements from streams from one thing to a slice of things
// which will then be flattened into a Stream
type FlatMapper func(element interface{}) []interface{}
// Consumer is used to perform some process ForEach element in streams
type Consumer func(element interface{})
// Collector is used by streams.Collect
// Add is used to add stream elements to the collection
// Complete returns the collection
type Collector interface {
Add(subject interface{})
Complete() interface{}
}
// Stream is the underlying data type that the Streams struct uses
type Stream chan interface{}
// Streams is the struct that stores the state of the different Streams
// In general, there will be 1 Stream in streams for each
// Filter / Map / ForEachThen that has been called on this
// Streams object thus far. Each will have a buffer of size
// channelBuffer.
type Streams struct {
streams []Stream
channelBuffer int
}
func toStream(collection []interface{}) Stream {
ch := make(chan interface{}, len(collection))
go func() {
defer close(ch)
for _, element := range collection {
ch <- element
}
}()
return ch
}
// FromCollection creates a streams object from the given slice.
// The channel buffer size will be set to the size of the slice
// If the data being processed is large enough that a slice would be
// impractical, use FromStream instead
func FromCollection(collection []interface{}) *Streams {
startStream := toStream(collection)
return FromStream(startStream, len(collection))
}
// FromStream creates a streams object from the given channel
// Future Stream objects in the streams object will be created with
// a buffer size of bufferSize
func FromStream(stream Stream, bufferSize int) *Streams {
streams := Streams{[]Stream{stream}, bufferSize}
return &streams
}
// FromScanner creates a streams object from the given channel.
// Each element in the stream represents one scanner.Scan() and scanner.Text().
// The elements will be of type string.
// Future Stream objects in the streams object will be created with
// a buffer size of bufferSize
func FromScanner(scanner *bufio.Scanner, bufferSize int) *Streams {
ch := make(Stream, bufferSize)
go func() {
defer close(ch)
for scanner.Scan() {
ch <- scanner.Text()
}
}()
streams := Streams{
[]Stream{ch},
bufferSize,
}
return &streams
}
func addNewStream(streams *Streams) (current, next Stream) {
current = streams.streams[len(streams.streams)-1]
next = make(Stream, streams.channelBuffer)
streams.streams = append(streams.streams, next)
return
}
// Filter asynchronously filters the elements in the streams using the provided Predicate.
// Elements that cause the Predicate to evaluate to true are kept,
// elements that cause the Predicate to evaluate to false are discarded.
func (streams *Streams) Filter(predicate Predicate) *Streams {
current, next := addNewStream(streams)
go func() {
defer close(next)
for object := range current {
if predicate(object) {
next <- object
}
}
}()
return streams
}
// Map asynchronously transforms the elements in the streams using the provided Mapper.
// Use this to turn the elements of the stream from one thing into another thing
func (streams *Streams) Map(mapper Mapper) *Streams {
current, next := addNewStream(streams)
go func() {
defer close(next)
for object := range current {
next <- mapper(object)
}
}()
return streams
}
// FlatMap asynchronously transforms the elements in the streams using the provided
// FlatMapper. Use this to turn each element in a stream into 0 or more elements.
func (streams *Streams) FlatMap(mapper FlatMapper) *Streams {
current, next := addNewStream(streams)
go func() {
defer close(next)
for object := range current {
for _, element := range mapper(object) {
next <- element
}
}
}()
return streams
}
func (streams *Streams) lastStream() Stream {
return streams.streams[len(streams.streams)-1]
}
// Reduce the elements in the stream to a singe element
// The single element can be of a different element, but it should
// probably be the same type as initial, or else your Reducer function
// will be ugly.
// The first parameter will always be the reduction thus far, and for the first
// iteration, it will be the initial parameter passed into Reduce
func (streams *Streams) Reduce(initial interface{}, reducer Reducer) interface{} {
for element := range streams.lastStream() {
initial = reducer(initial, element)
}
return initial
}
// Collect the elements in the stream in a single collection like a slice or
// a map. This function is out of place because it accepts an interface
// instead of a function.
// Calls Collector.Add on each element of the stream, then returns Collector.Complete
func (streams *Streams) Collect(collector Collector) interface{} {
lastStream := streams.lastStream()
for element := range lastStream {
collector.Add(element)
}
return collector.Complete()
}
// ForEach calls consumer(element) on each element on the stream
func (streams *Streams) ForEach(consumer Consumer) {
lastStream := streams.lastStream()
for element := range lastStream {
consumer(element)
}
}
// ForEachThen calls consumer(element) on each element of the stream, returns the streams
// for future use.
// As far as I can tell, there is no instance where it is more efficient
// to do ForEachThen().ForEach() rather than combining the consumer functions
// That said, this can be more readable, and it allows you to Collect / Reduce after
func (streams *Streams) ForEachThen(consumer Consumer) *Streams {
current, next := addNewStream(streams)
go func() {
defer close(next)
for element := range current {
consumer(element)
next <- element
}
}()
return streams
}