Skip to content

Commit c161a19

Browse files
committed
Merge branch 'master' into develop
2 parents 182d0ef + 85933cc commit c161a19

File tree

1 file changed

+62
-11
lines changed

1 file changed

+62
-11
lines changed

lib/stream-to-async-iterator.js

Lines changed: 62 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,15 @@ export const states = {
1111
errored: Symbol('errored'),
1212
};
1313

14+
/*
15+
* A contract for a promise that requires a clean up
16+
* function be called after the promise finishes.
17+
*/
18+
type PromiseWithCleanUp<T> = {
19+
promise: Promise<T>,
20+
cleanup: () => void,
21+
}
22+
1423
/**
1524
* @typedef {Object} StreamToAsyncIterator~Options
1625
* @property {number} [size] - the size of each read from the stream for each iteration
@@ -111,9 +120,22 @@ export default class StreamToAsyncIterator<TVal> {
111120
*/
112121
async next(): Promise<Iteration<TVal>> {
113122
if (this._state === states.notReadable) {
123+
const read = this._untilReadable();
124+
const end = this._untilEnd();
125+
114126
//need to wait until the stream is readable or ended
115-
await Promise.race([this._untilReadable(), this._untilEnd()]);
116-
return this.next();
127+
try {
128+
await Promise.race([read.promise, end.promise]);
129+
return this.next();
130+
}
131+
catch (e) {
132+
throw e
133+
}
134+
finally {
135+
//need to clean up any hanging event listeners
136+
read.cleanup()
137+
end.cleanup()
138+
}
117139
} else if (this._state === states.ended) {
118140
return {done: true};
119141
} else if (this._state === states.errored) {
@@ -138,34 +160,63 @@ export default class StreamToAsyncIterator<TVal> {
138160
* @private
139161
* @returns {Promise}
140162
*/
141-
_untilReadable(): Promise<void> {
142-
return new Promise((resolve, reject) => {
143-
const handleReadable = () => {
163+
_untilReadable(): PromiseWithCleanUp<void> {
164+
//let is used here instead of const because the exact reference is
165+
//required to remove it, this is why it is not a curried function that
166+
//accepts resolve & reject as parameters.
167+
let eventListener = null;
168+
169+
const promise = new Promise((resolve, reject) => {
170+
eventListener = () => {
144171
this._state = states.readable;
145172
this._rejections.delete(reject);
173+
174+
// we set this to null to info the clean up not to do anything
175+
eventListener = null;
146176
resolve();
147177
};
148178

149-
this._stream.once('readable', handleReadable);
179+
//on is used here instead of once, because
180+
//the listener is remove afterwards anyways.
181+
this._stream.once('readable', eventListener);
150182
this._rejections.add(reject);
151183
});
184+
185+
const cleanup = () => {
186+
if (eventListener == null) return;
187+
this._stream.removeListener('readable', eventListener);
188+
};
189+
190+
return { cleanup, promise }
152191
}
153192

154193
/**
155194
* Waits until the stream is ended. Rejects if the stream errored out.
156195
* @private
157196
* @returns {Promise}
158197
*/
159-
_untilEnd(): Promise<void> {
160-
return new Promise((resolve, reject) => {
161-
const handleEnd = () => {
198+
_untilEnd(): PromiseWithCleanUp<void> {
199+
let eventListener = null;
200+
201+
const promise = new Promise((resolve, reject) => {
202+
eventListener = () => {
162203
this._state = states.ended;
163204
this._rejections.delete(reject);
205+
206+
eventListener = null
164207
resolve();
165208
};
166-
this._stream.once('end', handleEnd);
209+
210+
this._stream.once('end', eventListener);
167211
this._rejections.add(reject);
168-
})
212+
});
213+
214+
const cleanup = () => {
215+
if (eventListener == null) return;
216+
this._stream.removeListener('end', eventListener);
217+
};
218+
219+
return { cleanup, promise }
169220
}
170221
}
171222

0 commit comments

Comments
 (0)