-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil_stream.ml
335 lines (305 loc) · 7.12 KB
/
util_stream.ml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
(*
General-purpose stream functions
*)
let fold_left f acc0 st =
let acc = ref acc0 in
Stream.iter (fun x -> acc := f !acc x) st;
!acc
let to_list_rev st =
fold_left (fun l x -> x :: l) [] st
let to_list st =
List.rev (to_list_rev st)
(* Streams created from line-formatted data files. *)
let pop st =
match Stream.peek st with
None -> None
| Some _ as x ->
Stream.junk st;
x
let iteri f st =
let counter = ref 0 in
Stream.iter (fun x -> f !counter x; incr counter) st
(*
Make a stream of the lines of an in_channel.
*)
let stream_of_channel ic =
let next i =
try Some (input_line ic)
with End_of_file ->
close_in ic;
None
in
Stream.from next
(*
Make a stream of the lines of a file.
*)
let stream_of_file file =
let ic = open_in file in
let close_channel () = close_in_noerr ic in
(stream_of_channel ic, close_channel)
(*
1:1 mapping.
*)
let map f st =
let next i =
match pop st with
None -> None
| Some x -> Some (f x)
in
Stream.from next
let mapi f st =
let next i =
match pop st with
None -> None
| Some x -> Some (f i x)
in
Stream.from next
(*
Filtering.
*)
let filter f st =
let rec next i =
let x = pop st in
match x with
None -> None
| Some v ->
if f v then x
else next i
in
Stream.from next
(*
map + filter
*)
let select f st =
let rec next i =
let x = pop st in
match x with
None -> None
| Some v ->
match f v with
Some _ as y -> y
| None -> next i
in
Stream.from next
(*
Discard non-matching elements of a stream until
one matching element is found.
*)
let skip_to f st =
let skipping = ref true in
let rec next i =
if !skipping then
match pop st with
None -> None
| Some v as x ->
if f v then (
skipping := false;
x
)
else
next i
else
pop st
in
Stream.from next
let really_skip_to ?(discard = fun _ -> ()) f st =
let rec skip discard n f st =
match Stream.peek st with
None -> n
| Some v ->
if not (f v) then (
Stream.junk st;
discard v;
skip discard (n + 1) f st
)
else
n
in
skip discard 0 f st
(*
Map each element to a list and add to a single stream.
*)
let flatten f st =
let q = Queue.create () in
let rec next i =
try Some (Queue.take q)
with Queue.Empty ->
match pop st with
None -> None
| Some x ->
List.iter (fun x -> Queue.add x q) (f x);
next i
in
Stream.from next
(*
Concatenate a list of streams.
*)
let concat l =
match l with
[] -> Stream.from (fun _ -> None)
| first :: rest ->
let q = Queue.create () in
List.iter (fun x -> Queue.add x q) rest;
let cur = ref first in
let rec next i =
try Some (Stream.next !cur)
with Stream.Failure ->
if Queue.is_empty q then
None
else (
cur := Queue.take q;
next i
)
in
Stream.from next
(*
Group by keys within a sorted list.
*)
let reduce cmp f st =
let cur_key = ref None in
let accu = ref [] in
let rec next i =
match !cur_key, pop st with
Some k0, Some (k, data) ->
if cmp k k0 = 0 then (
accu := data :: !accu;
next i
)
else (
let result = Some (f k0 (List.rev !accu)) in
cur_key := Some k;
accu := [data];
result
)
| Some k0, None ->
let result = Some (f k0 (List.rev !accu)) in
cur_key := None;
accu := [];
result
| None, Some (k, data) ->
cur_key := Some k;
accu := [data];
next i
| None, None ->
None
in
Stream.from next
(*
Merge streams of items sorted by keys.
*)
let merge_full cmp init fold l =
let next i =
(* Find the smallest key at the beginning of each stream
without consuming them. *)
let opt =
List.fold_left (
fun opt st ->
match opt, Stream.peek st with
| _, None -> opt
| Some key0, Some (key, _) ->
let c = cmp key0 key in
if c <= 0 then opt
else Some key
| None, Some (key, _) ->
Some key
) None l
in
match opt with
| None -> None
| Some key ->
(* Initialize an accumulator for the new key *)
let acc = init key in
(* Read all the values that have this same key at the beginning
of each stream. *)
let result =
List.fold_left (
fun acc st ->
let rec loop acc =
match Stream.peek st with
| Some (k, v) ->
if cmp key k = 0 then (
(* Consume the stream and add value to accumulator. *)
Stream.junk st;
let acc = fold acc v in
(* Look for more of the same key
within the same stream. *)
loop acc
)
else
acc
| None ->
acc
in
loop acc
) acc l
in
Some (key, result)
in
match l with
| [] ->
Stream.from (fun _ -> None)
| l ->
Stream.from next
(*
Simplified version of merge_full which flattens the lists of values
found under the same key.
*)
let merge cmp streams =
let kv_streams =
List.rev (
List.rev_map (fun stream ->
map (fun x -> (x, x)) stream
) streams
)
in
let grouped_by_key =
merge_full
cmp
(fun k -> [])
(fun acc v -> v :: acc)
kv_streams
in
flatten (fun (k, vl) -> vl) grouped_by_key
let test_merge () =
let merge ll =
let streams = List.map Stream.of_list ll in
to_list (merge compare streams)
in
assert (merge [] = []);
assert (merge [[]] = []);
assert (merge [[]; []; []] = []);
assert (merge [[1; 2; 3]; [4; 5]] = [1; 2; 3; 4; 5]);
assert (merge [[1; 3]; [2; 4; 5]] = [1; 2; 3; 4; 5]);
assert (merge [[2; 4; 5]; [1; 3]] = [1; 2; 3; 4; 5]);
assert (merge [[1; 3; 6]; [4; 5]; [2; 7]] = [1; 2; 3; 4; 5; 6; 7]);
assert (merge [[2; 4; 4; 5]; [1; 2; 3]] = [1; 2; 2; 3; 4; 4; 5]);
true
let test_merge_full () =
let deduplicate ll =
let streams =
List.map (fun l ->
let kvl =
List.map (fun x -> (x, ())) l
in
Stream.of_list kvl
) ll
in
let l =
to_list (
merge_full
compare
(fun k -> ())
(fun acc () -> ())
streams
)
in
List.map (fun (k, ()) -> k) l
in
assert (deduplicate [] = []);
assert (deduplicate [[1; 3; 6]; [4; 5]; [2; 7]] = [1; 2; 3; 4; 5; 6; 7]);
assert (deduplicate [[2; 4; 4; 5]; [1; 2; 3]] = [1; 2; 3; 4; 5]);
true
let tests = [
"merge", test_merge;
"merge_full", test_merge_full;
]