-
Notifications
You must be signed in to change notification settings - Fork 0
/
iterable-queue-mapper.ts
139 lines (112 loc) · 4.45 KB
/
iterable-queue-mapper.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
/* eslint-disable no-console */
import { IterableQueueMapper } from '@shutterstock/p-map-iterable';
import AggregateError from 'aggregate-error';
import { promisify } from 'util';
const sleep = promisify(setTimeout);
class SleepIterator implements AsyncIterable<{ fileName: string; fileSizeGB: number }> {
private _max: number;
private _current = 1;
private _files: number[];
constructor(max: number) {
this._max = max;
// Create a list of remote files to download
// Each file has a random size between 10 and 100 GB
this._files = Array.from({ length: 100 }, () => Math.floor(Math.random() * 90) + 10);
}
async *[Symbol.asyncIterator](): AsyncIterator<{ fileName: string; fileSizeGB: number }> {
for (let i = 0; i < this._max; i++) {
const nextFile = this._files[i];
if (nextFile === undefined) {
break;
}
// Take some time to generate the next item
await sleep(33 * (i % 10));
if (this._current <= this._max) {
yield { fileName: `${i.toString().padStart(5, '0')}.json`, fileSizeGB: nextFile };
}
this._current++;
}
}
}
async function main() {
const max = 100;
const iterator = new SleepIterator(max);
let queuedButUnreadFileSizeGB = 0;
let callCount = 0;
// Create an item prefetcher with IterableQueueMapper
const prefetcher = new IterableQueueMapper(
async (value: {
fileName: string;
fileSizeGB: number;
}): Promise<{ fileName: string; fileSizeGB: number; status: number }> => {
const myCallCount = callCount++;
console.log(
`Mapper Call Start ${myCallCount}, FileName: ${value.fileName}, FileSizeGB: ${value.fileSizeGB}`,
);
// Simulate fetching an async files with varied delays
// for example: fetching from AWS S3
await sleep(Math.random() * 500);
if (value.fileSizeGB % 5 === 0) {
throw new Error(`Simulated error ${myCallCount}`);
}
console.log(
`Mapper Call Done ${myCallCount}, FileName: ${value.fileName}, FileSizeGB: ${value.fileSizeGB}`,
);
queuedButUnreadFileSizeGB += value.fileSizeGB;
return { ...value, status: 200 };
},
{ concurrency: 2, maxUnread: 10, stopOnMapperError: false },
);
let loopCount = 0;
// Add items to the queue in the background
const jobAdder = (async () => {
for await (const item of iterator) {
console.log(`Enqueue Start FileName: ${item.fileName}, FileSizeGB: ${item.fileSizeGB}`);
await prefetcher.enqueue(item);
console.log(`Enqueue Done FileName: ${item.fileName}, FileSizeGB: ${item.fileSizeGB}`);
}
prefetcher.done();
})();
// Loop through the prefetched items or batches
// Will pause and wait for prefetch to complete if none available
// Will immediately start processing if prefetched item or batch is already available
// If the `maxUnread` count was hit then no prefetches will be in progress until an item
// is returned from the iterable here. Once an item is returned a mapper
// will be started; this will repeat until `concurrency` mappers are running again.
try {
for await (const item of prefetcher) {
loopCount++;
console.log(
`Result Start ${loopCount.toString().padStart(3, '0')}, FileName: ${
item.fileName
}, FileSizeGB: ${item.fileSizeGB}, QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB
.toString()
.padStart(3, '0')}`,
);
// Simulate taking some time to process this big file of stuff
// This is where we're going to keep the JS thread busy
await sleep(Math.random() * 500);
queuedButUnreadFileSizeGB -= item.fileSizeGB;
console.log(
`Result Done ${loopCount.toString().padStart(3, '0')}, FileName: ${
item.fileName
}, FileSizeGB: ${item.fileSizeGB}, QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB
.toString()
.padStart(3, '0')}`,
);
}
// eslint-disable-next-line @typescript-eslint/no-explicit-any
} catch (error: any) {
if (error instanceof AggregateError) {
console.log('Error is AggregateError');
}
console.log('CAUGHT ERRORS AFTER LOOP Start');
console.log(error.message);
console.log('CAUGHT ERRORS AFTER LOOP Done');
}
// Wait for the job adder to finish adding the jobs
// (it's throughput is constrained by the prefetcher's concurrency)
await jobAdder;
console.log(`QueuedButUnreadFileSizeGB: ${queuedButUnreadFileSizeGB}`);
}
void main();