forked from johnkerl/miller
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathaaa_chain_transformer.go
294 lines (275 loc) · 10.1 KB
/
aaa_chain_transformer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
package transformers
import (
"container/list"
"fmt"
"github.com/johnkerl/miller/pkg/cli"
"github.com/johnkerl/miller/pkg/types"
"os"
)
// ================================================================
// ChainTransformer is a refinement of Miller's high-level sketch in stream.go.
// As far as stream.go is concerned, the transformer-chain is a box which reads
// from an input-record channel (from the record-reader object) and writes to
// an output-record channel (to the record-writer object). Inside that box is a
// bit more complexity, including channels between transformers in the chain.
//
// ----------------------------------------------------------------
// Channel structure from outside the box:
// * readerInputRecordChannel "ichan" passes records from the record-reader
// to the transformer chain.
// * writerOutputRecordChannel "ochan" passes records from the transformer chain
// to the record-writer.
// * readerDownstreamDoneChannel "dchan" signifies back to the record-reader
// that it can stop reading files, since all downstream consumers aren't
// interested in any more data.
//
// Record-reader
// | ichan ^ dchan
// v |
// +---------------+
// | Transformer 0 |
// | Transformer 1 |
// | Transformer 2 |
// | Transformer 3 |
// +---------------+
// | ochan
// v
// Record-writer
//
// ----------------------------------------------------------------
// Channel structure from inside the box:
//
// Record-reader
// | ichan ^ dchan
// v |
// +---------------+
// | Transformer 0 |
// | | ^ |
// | v | |
// | Transformer 1 |
// | | ^ |
// | v | |
// | Transformer 2 |
// | | ^ |
// | v | |
// | Transformer 3 |
// +---------------+
// | ochan
// v
// Record-writer
//
// ----------------------------------------------------------------
// Each transformer has four channels from its point of view:
//
// | irchan odchan |
// | | ^ |
// | v | |
// | Transformer i |
// | | ^ |
// | v | |
// | orchan idchan |
//
// * inputRecordChannel "irchan" is where it reads records from.
// o If the transformer is the first in the chain, this is the
// readerInputRecordChannel passed into ChainTransformer.
// o Otherwise it's an intermediary channel from transformer i-1's output,
// created and connected by ChainTransformer.
//
// * outputRecordChannel "orchan" is where it writes records to.
// o If the transformer is the last in the chain, this is the
// writerOutputRecordChannel passed into ChainTransformer.
// o Otherwise it's an intermediary channel to transformer i+1's input,
// created and connected by ChainTransformer.
//
// * inputDownstreamDoneChannel "idchan" is where it reads a downstream-done flag from.
// o These are all created internally by ChainTransformer.
// o If the transform is the last in the chain, nothing writes to its idchan.
// o Otherwise transformer i's idchan is connected to transformer i+1's odchan,
// so transformer i can accept a downstream-done flag.
//
// * outputDownstreamDoneChannel "odchan" is where it writes a downstream-done flag to.
// o If the transformer is the first in the chain, this is the
// readerDownstreamDoneChannel passed into ChainTransformer.
// o Otherwise it's an intermediary channel connected to transformer i-1's idchan,
// so transformer i can produce a downstream-done flag.
//
// ----------------------------------------------------------------
// Handling in practice:
//
// * Most verbs pass a downstrem-done flag on their idchan upstream to their odchan.
// The HandleDefaultDownstreamDone function is for this purpose: most verbs use it.
//
// * The exceptional verbs are HEAD, TEE, and SEQGEN.
//
// * mlr head is the reason this exists. The problem to solve is that people
// want to do 'mlr head -n 10 myhugefile.dat' and have mlr exit as soon as those
// 10 records have been read.
//
// * However, if someone does 'mlr cut -f foo then tee bar.dat then head -n 10',
// they want bar.dat to have all the records seen by tee; head -n 10 should produce
// only the first 10 but bar.dat should have them all.
//
// * Likewise, 'mlr seqgen --stop 1000000000 then head -n 10' should result
// in seqgen breaking out of its data-production loop.
//
// * In head.go, tee.go, and seqgen.go you will see specific handling of
// reading idchan and writing odchan.
//
// ----------------------------------------------------------------
// TESTING
//
// * This is a bit awkward with regard to regression-test -- we don't want a
// multi-GB data file in our repo for the continuous integration job to check
// that the processing finishes quickly.
//
// * Nonetheless: all these should finish quickly/
//
// mlr head -n 10 ~/tmp/huge
// mlr cat then head -n 10 ~/tmp/huge
// mlr head -n 100 then tee foo.txt then head -n 10 ~/tmp/huge
// check `wc -l foo.txt` is 100
// mlr head -n 100 then tee foo.txt then head -n 10 then tee bar.txt ~/tmp/huge
// check `wc -l foo.txt` is 100 and `wc -l bar.txt` is 10
// mlr seqgen --stop 100000000 then head -n 10
//
// ================================================================
// ChainTransformer is a refinement of Miller's high-level sketch in stream.go.
// While stream.go sees goroutines for record reader, transformer chain, and
// record writer, with input channel from record-reader to transformer chain
// and output channel from transformer chain to record-writer, ChainTransformer
// subdivides goroutines for each transformer in the chain, with intermediary
// channels between them.
func ChainTransformer(
readerRecordChannel <-chan *list.List, // list of *types.RecordAndContext
readerDownstreamDoneChannel chan<- bool, // for mlr head -- see also stream.go
recordTransformers []IRecordTransformer, // not *recordTransformer since this is an interface
writerRecordChannel chan<- *list.List, // list of *types.RecordAndContext
options *cli.TOptions,
) {
i := 0
n := len(recordTransformers)
intermediateRecordChannels := make([]chan *list.List, n-1) // list of *types.RecordAndContext
for i = 0; i < n-1; i++ {
intermediateRecordChannels[i] = make(chan *list.List, 1) // list of *types.RecordAndContext
}
intermediateDownstreamDoneChannels := make([]chan bool, n)
for i = 0; i < n; i++ {
intermediateDownstreamDoneChannels[i] = make(chan bool, 1)
}
for i, recordTransformer := range recordTransformers {
// Downstream flow: channel a given transformer reads records from
irchan := readerRecordChannel
// Downstream flow: channel a given transformer writes transformed
// records to
orchan := writerRecordChannel
// Upstream signaling: channel a given transformer reads to see if
// downstream transformers are done (e.g. mlr head)
idchan := intermediateDownstreamDoneChannels[i]
// Upstream signaling: channel a given transformer (e.g. mlr head)
// writes to signal to upstream transformers that it will ignore
// further input.
odchan := readerDownstreamDoneChannel
if i > 0 {
irchan = intermediateRecordChannels[i-1]
odchan = intermediateDownstreamDoneChannels[i-1]
}
if i < n-1 {
orchan = intermediateRecordChannels[i]
}
go runSingleTransformer(
recordTransformer,
i == 0,
irchan,
orchan,
idchan,
odchan,
options,
)
}
}
func runSingleTransformer(
recordTransformer IRecordTransformer,
isFirstInChain bool,
inputRecordChannel <-chan *list.List, // list of *types.RecordAndContext
outputRecordChannel chan<- *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
options *cli.TOptions,
) {
done := false
for !done {
recordsAndContexts := <-inputRecordChannel
done = runSingleTransformerBatch(
recordsAndContexts,
recordTransformer,
isFirstInChain,
outputRecordChannel,
inputDownstreamDoneChannel,
outputDownstreamDoneChannel,
options,
)
}
}
// TODO: comment
// Returns true on end of record stream
func runSingleTransformerBatch(
inputRecordsAndContexts *list.List, // list of types.RecordAndContext
recordTransformer IRecordTransformer,
isFirstInChain bool,
outputRecordChannel chan<- *list.List, // list of *types.RecordAndContext
inputDownstreamDoneChannel <-chan bool,
outputDownstreamDoneChannel chan<- bool,
options *cli.TOptions,
) bool {
outputRecordsAndContexts := list.New()
done := false
for e := inputRecordsAndContexts.Front(); e != nil; e = e.Next() {
inputRecordAndContext := e.Value.(*types.RecordAndContext)
// --nr-progress-mod
// TODO: function-pointer this away to reduce instruction count in the
// normal case which it isn't used at all. No need to test if {static thing} != 0
// on every record.
if options.NRProgressMod != 0 {
if isFirstInChain && inputRecordAndContext.Record != nil {
context := &inputRecordAndContext.Context
if context.NR%options.NRProgressMod == 0 {
fmt.Fprintf(os.Stderr, "NR=%d FNR=%d FILENAME=%s\n", context.NR, context.FNR, context.FILENAME)
}
}
}
// Three things can come through:
//
// * End-of-stream marker
// * Non-nil records to be printed
// * Strings to be printed from put/filter DSL print/dump/etc
// statements. They are handled here rather than fmt.Println directly
// in the put/filter handlers since we want all print statements and
// record-output to be in the same goroutine, for deterministic
// output ordering.
//
// The first two are passed to the transformer. The third we send along
// the output channel without involving the record-transformer, since
// there is no record to be transformed.
if inputRecordAndContext.EndOfStream == true || inputRecordAndContext.Record != nil {
recordTransformer.Transform(
inputRecordAndContext,
outputRecordsAndContexts,
// TODO: maybe refactor these out of each transformer.
// And/or maybe poll them once per batch not once per record.
inputDownstreamDoneChannel,
outputDownstreamDoneChannel,
)
} else {
outputRecordsAndContexts.PushBack(inputRecordAndContext)
}
if inputRecordAndContext.EndOfStream {
done = true
break
}
if done {
break
}
}
outputRecordChannel <- outputRecordsAndContexts
return done
}